Kafka安装及使用

646次阅读
没有评论

共计 2914 个字符,预计需要花费 8 分钟才能阅读完成。

一. Docker 安装

https://blog.csdn.net/ch_improve/article/details/114268915

1. 下载

# 下载 zookeeper 镜像
docker pull wurstmeister/zookeeper

# 下载 kafka 镜像
docker pull wurstmeister/kafka

2. 启动

  • 方式一 : docker 启动
# 启动 zookeeper
docker run -d --name zookeeper --publish 2181:2181 --volume /etc/localtime:/etc/localtime wurstmeister/zookeeper

# 启动 kafka
docker run -d --name kafka3 --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_HOST_NAME=127.0.0.1 --env KAFKA_ADVERTISED_PORT=9092 wurstmeister/kafka
  • 方式二 : docker-compose 启动
# docker-compose.yml
version: '2'
services:
  zookeeper:
    image: "wurstmeister/zookeeper"
    hostname: "zookeeper.local"
    container_name: "zookeeper"
    networks:
      local:
        aliases:
          - "zookeeper.local"
  kafka:
    image: "wurstmeister/kafka"
    hostname: "kafka.local"
    container_name: "kafka"
    ports:
      - "9092:9092"
    networks:
      local:
        aliases:
          - "kafka.local"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 10.64.66.118  # 对外开放的 ip
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

networks:
  local:
    driver: bridge
# 简便写法
version: '2'
services:
    zookeeper:
        image: wurstmeister/zookeeper
        volumes:
        - ./data:/data
        ports:
            - "2181:2181"

    kafka:
        image: wurstmeister/kafka
        ports:
            - "9092:9092"
        environment:
          KAFKA_ADVERTISED_HOST_NAME: 10.64.66.118
          KAFKA_MESSAGE_MAX_BYTES: 2000000
          KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
        volumes:
          - /var/run/docker.sock:/var/run/docker.sock
networks:
  local:
    driver: bridge

启动命令 : docker-compose up -d

二. 终端测试

1. 打开两个命令框,都进入 kafka 容器内

docker exec -it kafka bash

2. 一个命令框测试创建 topic 并订阅

# 连接 zookeeper,创建一个 topic
/opt/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper:2181 -replication-factor 1 --partitions 1 --topic test01
# 查看 topic
/opt/kafka/bin/kafka-topics.sh --zookeeper zookeeper:2181 --list
# 消费者订阅该 topic
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka.local:9092 --topic test01 --from-beginning

第一条命令报错:Error while fetching metadata with correl

Kafka 安装及使用

原因:无法连接 zookeeper, 重启该容器即可

3. 另一个终端测试生产数据

# 生产者
/opt/kafka/bin/kafka-console-producer.sh --broker-list kafka.local:9092 --topic test01

/opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic "RCU_TDM_TOPIC"

测试:

  • 在生产者终端不断发消息,在消费者终端可以收到消息

三.Python 操作 Kafka 测试

1. 测试

  • server.py
from kafka import KafkaProducer
from kafka.errors import KafkaError
import time

class KafkaPythonDemo(object):
    def __init__(self):
        self.producer = KafkaProducer(bootstrap_servers=["10.64.66.118:9092"])
        self.topic = 'test1'

    def producerTest(self):
        print('------ producer begins ------')
        n = 1
        try:
            while (n <= 100):
                self.producer.send(self.topic, str(n).encode())
                print(f'send {str(n)}')
                n += 1
                time.sleep(0.5)
        except KafkaError as e:
            print(e)
        finally:
            self.producer.close()
            print('======= 生产完毕 =========')

if __name__ == "__main__":
    kafkaPythonDemo = KafkaPythonDemo()
    kafkaPythonDemo.producerTest()
  • client.py
from kafka import KafkaConsumer

consumer = KafkaConsumer('test1', bootstrap_servers='10.64.66.118:9092')
for msg in consumer:
    print(msg.value.decode())

2. 对外开放连接配置问题

  • kafka 在运行在虚拟机容器内

  • 映射 9092 端口

在虚拟机内可以连接 kafka, 在 windows 宿主机上无法连接

Kafka 安装及使用

  • 解决:修改 server.properties 配置文件
advertised.listeners=PLAINTEXT://10.64.66.118:9092  # 对外地址 
正文完
 
shawn
版权声明:本站原创文章,由 shawn 2023-06-12发表,共计2914字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
评论(没有评论)